﻿using iTool.Common;
using Orleans;
using Orleans.Providers.Streams.Common;
using Orleans.Streams;
using System;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    public abstract class SubscribeQueueHandler<T> : IDisposable
    {
        private IAsyncStream<T> stream;
        protected StreamSubscriptionHandle<T> streamHandle;
        private IClusterClient iClient;
        private Guid streamId;
        private string streamNamespace;
        private bool isBroadcast;


        public SubscribeQueueHandler(string topic, string streamNamespace) : this(
            new QueueProvider(iBox.GetService<IClusterClient>("IClusterService").GetStreamProvider("iToolSimpleStream")),
            topic, 
            streamNamespace)
        { 
        }

        /// <summary>
        /// 订阅流
        /// 响应流程: 流提供商 > 流空间 > 主题
        /// </summary>
        /// <param name="cluster">集群ID</param>
        /// <param name="topic">订阅主题</param>
        /// <param name="streamNamespace">流空间</param>
        /// <param name="providerName">流提供商Title</param>
        public SubscribeQueueHandler(IQueueProvider provider, string topic, string streamNamespace, bool isBroadcast = true)
        {
            this.streamId = iUtils.ToGuid(topic);
            this.streamNamespace = streamNamespace;
            stream = provider.GetStream<T>(this.streamId, streamNamespace);
            this.iClient = iBox.GetService<IClusterClient>("IClusterService");
            this.isBroadcast = isBroadcast;
            Console.WriteLine("Subscribe topic:{0},streamNamespace:{1},streamId:{2}", topic, streamNamespace,this.streamId);
        }

        /// <summary>
        /// 开始消费
        /// </summary>
        /// <param name="offset">从指定位置消费，默认从当前开始消费</param>
        /// <returns></returns>
        public async Task StartAsync(long offset = -1)
        {
            if (offset < 0 || this.isBroadcast)
            {
                this.streamHandle = await stream.SubscribeAsync(this.OnNextAsync, this.OnErrorAsync);
            }
            else
            {
                this.streamHandle = await stream.SubscribeAsync<T>(this.OnNextAsync, this.OnErrorAsync, new EventSequenceToken(offset));
            }

            //var queueOffset = this.iClient.GetGrain<IStreamQueueOffset>(this.streamId, this.streamNamespace);
            if (offset > 0)
            {
                var queueOffset = this.iClient.GetGrain<IStreamQueueOffset>(Guid.Empty, "Aggregation");
                await queueOffset.ReSetOffset(offset);
            }
        }

        /// <summary>
        /// 消费发送错误
        /// </summary>
        /// <param name="ex"></param>
        /// <returns></returns>
        public abstract Task OnErrorAsync(Exception ex);

        /// <summary>
        /// 订阅消息处理器
        /// </summary>
        /// <param name="item"></param>
        /// <param name="token"></param>
        /// <returns></returns>
        public async Task OnNextAsync(T message, StreamSequenceToken token)
        {
            var queueConsume = this.iClient.GetGrain<IStreamQueueConsume>(this.streamId, this.streamNamespace);
            if (!this.isBroadcast)
            {
                string messageKey = $"{token.SequenceNumber}_{token.EventIndex}";
                if (await queueConsume.IsCanConsumeMessageAsync(messageKey))
                {
                    try
                    {
                        await this.OnMessageAsync(message, token);
                    }
                    catch (Exception ex)
                    {
                        await queueConsume.ConsumeMessageErrorAsync(messageKey);
                        Console.WriteLine($"OnMessageAsync Error: {ex.Message}");
                        throw ex;
                    }
                }
            }
            else
            {
                await this.OnMessageAsync(message, token);
            }
        }

        public abstract Task OnMessageAsync(T message, StreamSequenceToken token);

        /// <summary>
        /// 优雅的取消订阅
        /// </summary>
        /// <returns></returns>
        public async Task UnsubscribeAsync()
        {
            if (streamHandle != null)
            {
                await this.streamHandle.UnsubscribeAsync();
            }
        }

        public void Dispose()
        {
            Task.WaitAll(this.UnsubscribeAsync());
            this.streamHandle = null;
            this.stream = null;
        }
    }
}
